1 /**
2 * Copyright 2014 Netflix, Inc.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16 package rx.observables;
17
18 import java.util.Iterator;
19 import java.util.NoSuchElementException;
20 import java.util.concurrent.CountDownLatch;
21 import java.util.concurrent.Future;
22 import java.util.concurrent.atomic.AtomicReference;
23
24 import rx.Observable;
25 import rx.Subscriber;
26 import rx.Subscription;
27 import rx.functions.Action1;
28 import rx.functions.Func1;
29 import rx.internal.operators.BlockingOperatorLatest;
30 import rx.internal.operators.BlockingOperatorMostRecent;
31 import rx.internal.operators.BlockingOperatorNext;
32 import rx.internal.operators.BlockingOperatorToFuture;
33 import rx.internal.operators.BlockingOperatorToIterator;
34 import rx.internal.util.UtilityFunctions;
35
36 /**
37 * {@code BlockingObservable} is a variety of {@link Observable} that provides blocking operators. It can be
38 * useful for testing and demo purposes, but is generally inappropriate for production applications (if you
39 * think you need to use a {@code BlockingObservable} this is usually a sign that you should rethink your
40 * design).
41 * <p>
42 * You construct a {@code BlockingObservable} from an {@code Observable} with {@link #from(Observable)} or
43 * {@link Observable#toBlocking()}.
44 * <p>
45 * The documentation for this interface makes use of a form of marble diagram that has been modified to
46 * illustrate blocking operators. The following legend explains these marble diagrams:
47 * <p>
48 * <img width="640" height="301" src="https://github.com/ReactiveX/RxJava/wiki/images/rx-operators/B.legend.png" alt="">
49 *
50 * @see <a href="https://github.com/ReactiveX/RxJava/wiki/Blocking-Observable-Operators">RxJava wiki: Blocking
51 * Observable Operators</a>
52 * @param <T>
53 * the type of item emitted by the {@code BlockingObservable}
54 */
55 public final class BlockingObservable<T> {
56
57 private final Observable<? extends T> o;
58
59 private BlockingObservable(Observable<? extends T> o) {
60 this.o = o;
61 }
62
63 /**
64 * Converts an {@link Observable} into a {@code BlockingObservable}.
65 *
66 * @param o
67 * the {@link Observable} you want to convert
68 * @return a {@code BlockingObservable} version of {@code o}
69 */
70 public static <T> BlockingObservable<T> from(final Observable<? extends T> o) {
71 return new BlockingObservable<T>(o);
72 }
73
74 /**
75 * Invokes a method on each item emitted by this {@code BlockingObservable} and blocks until the Observable
76 * completes.
77 * <p>
78 * <em>Note:</em> This will block even if the underlying Observable is asynchronous.
79 * <p>
80 * <img width="640" height="330" src="https://github.com/ReactiveX/RxJava/wiki/images/rx-operators/B.forEach.png" alt="">
81 * <p>
82 * This is similar to {@link Observable#subscribe(Subscriber)}, but it blocks. Because it blocks it does not
83 * need the {@link Subscriber#onCompleted()} or {@link Subscriber#onError(Throwable)} methods. If the
84 * underlying Observable terminates with an error, rather than calling {@code onError}, this method will
85 * throw an exception.
86 *
87 * @param onNext
88 * the {@link Action1} to invoke for each item emitted by the {@code BlockingObservable}
89 * @throws RuntimeException
90 * if an error occurs
91 * @see <a href="http://reactivex.io/documentation/operators/subscribe.html">ReactiveX documentation: Subscribe</a>
92 */
93 public void forEach(final Action1<? super T> onNext) {
94 final CountDownLatch latch = new CountDownLatch(1);
95 final AtomicReference<Throwable> exceptionFromOnError = new AtomicReference<Throwable>();
96
97 /*
98 * Use 'subscribe' instead of 'unsafeSubscribe' for Rx contract behavior
99 * as this is the final subscribe in the chain.
100 */
101 Subscription subscription = o.subscribe(new Subscriber<T>() {
102 @Override
103 public void onCompleted() {
104 latch.countDown();
105 }
106
107 @Override
108 public void onError(Throwable e) {
109 /*
110 * If we receive an onError event we set the reference on the
111 * outer thread so we can git it and throw after the
112 * latch.await().
113 *
114 * We do this instead of throwing directly since this may be on
115 * a different thread and the latch is still waiting.
116 */
117 exceptionFromOnError.set(e);
118 latch.countDown();
119 }
120
121 @Override
122 public void onNext(T args) {
123 onNext.call(args);
124 }
125 });
126 // block until the subscription completes and then return
127 try {
128 latch.await();
129 } catch (InterruptedException e) {
130 subscription.unsubscribe();
131 // set the interrupted flag again so callers can still get it
132 // for more information see https://github.com/ReactiveX/RxJava/pull/147#issuecomment-13624780
133 Thread.currentThread().interrupt();
134 // using Runtime so it is not checked
135 throw new RuntimeException("Interrupted while waiting for subscription to complete.", e);
136 }
137
138 if (exceptionFromOnError.get() != null) {
139 if (exceptionFromOnError.get() instanceof RuntimeException) {
140 throw (RuntimeException) exceptionFromOnError.get();
141 } else {
142 throw new RuntimeException(exceptionFromOnError.get());
143 }
144 }
145 }
146
147 /**
148 * Returns an {@link Iterator} that iterates over all items emitted by this {@code BlockingObservable}.
149 * <p>
150 * <img width="640" height="315" src="https://github.com/ReactiveX/RxJava/wiki/images/rx-operators/B.getIterator.png" alt="">
151 *
152 * @return an {@link Iterator} that can iterate over the items emitted by this {@code BlockingObservable}
153 * @see <a href="http://reactivex.io/documentation/operators/to.html">ReactiveX documentation: To</a>
154 */
155 public Iterator<T> getIterator() {
156 return BlockingOperatorToIterator.toIterator(o);
157 }
158
159 /**
160 * Returns the first item emitted by this {@code BlockingObservable}, or throws
161 * {@code NoSuchElementException} if it emits no items.
162 *
163 * @return the first item emitted by this {@code BlockingObservable}
164 * @throws NoSuchElementException
165 * if this {@code BlockingObservable} emits no items
166 * @see <a href="http://reactivex.io/documentation/operators/first.html">ReactiveX documentation: First</a>
167 */
168 public T first() {
169 return blockForSingle(o.first());
170 }
171
172 /**
173 * Returns the first item emitted by this {@code BlockingObservable} that matches a predicate, or throws
174 * {@code NoSuchElementException} if it emits no such item.
175 *
176 * @param predicate
177 * a predicate function to evaluate items emitted by this {@code BlockingObservable}
178 * @return the first item emitted by this {@code BlockingObservable} that matches the predicate
179 * @throws NoSuchElementException
180 * if this {@code BlockingObservable} emits no such items
181 * @see <a href="http://reactivex.io/documentation/operators/first.html">ReactiveX documentation: First</a>
182 */
183 public T first(Func1<? super T, Boolean> predicate) {
184 return blockForSingle(o.first(predicate));
185 }
186
187 /**
188 * Returns the first item emitted by this {@code BlockingObservable}, or a default value if it emits no
189 * items.
190 *
191 * @param defaultValue
192 * a default value to return if this {@code BlockingObservable} emits no items
193 * @return the first item emitted by this {@code BlockingObservable}, or the default value if it emits no
194 * items
195 * @see <a href="http://reactivex.io/documentation/operators/first.html">ReactiveX documentation: First</a>
196 */
197 public T firstOrDefault(T defaultValue) {
198 return blockForSingle(o.map(UtilityFunctions.<T>identity()).firstOrDefault(defaultValue));
199 }
200
201 /**
202 * Returns the first item emitted by this {@code BlockingObservable} that matches a predicate, or a default
203 * value if it emits no such items.
204 *
205 * @param defaultValue
206 * a default value to return if this {@code BlockingObservable} emits no matching items
207 * @param predicate
208 * a predicate function to evaluate items emitted by this {@code BlockingObservable}
209 * @return the first item emitted by this {@code BlockingObservable} that matches the predicate, or the
210 * default value if this {@code BlockingObservable} emits no matching items
211 * @see <a href="http://reactivex.io/documentation/operators/first.html">ReactiveX documentation: First</a>
212 */
213 public T firstOrDefault(T defaultValue, Func1<? super T, Boolean> predicate) {
214 return blockForSingle(o.filter(predicate).map(UtilityFunctions.<T>identity()).firstOrDefault(defaultValue));
215 }
216
217 /**
218 * Returns the last item emitted by this {@code BlockingObservable}, or throws
219 * {@code NoSuchElementException} if this {@code BlockingObservable} emits no items.
220 * <p>
221 * <img width="640" height="315" src="https://github.com/ReactiveX/RxJava/wiki/images/rx-operators/B.last.png" alt="">
222 *
223 * @return the last item emitted by this {@code BlockingObservable}
224 * @throws NoSuchElementException
225 * if this {@code BlockingObservable} emits no items
226 * @see <a href="http://reactivex.io/documentation/operators/last.html">ReactiveX documentation: Last</a>
227 */
228 public T last() {
229 return blockForSingle(o.last());
230 }
231
232 /**
233 * Returns the last item emitted by this {@code BlockingObservable} that matches a predicate, or throws
234 * {@code NoSuchElementException} if it emits no such items.
235 * <p>
236 * <img width="640" height="315" src="https://github.com/ReactiveX/RxJava/wiki/images/rx-operators/B.last.p.png" alt="">
237 *
238 * @param predicate
239 * a predicate function to evaluate items emitted by the {@code BlockingObservable}
240 * @return the last item emitted by the {@code BlockingObservable} that matches the predicate
241 * @throws NoSuchElementException
242 * if this {@code BlockingObservable} emits no items
243 * @see <a href="http://reactivex.io/documentation/operators/last.html">ReactiveX documentation: Last</a>
244 */
245 public T last(final Func1<? super T, Boolean> predicate) {
246 return blockForSingle(o.last(predicate));
247 }
248
249 /**
250 * Returns the last item emitted by this {@code BlockingObservable}, or a default value if it emits no
251 * items.
252 * <p>
253 * <img width="640" height="310" src="https://github.com/ReactiveX/RxJava/wiki/images/rx-operators/B.lastOrDefault.png" alt="">
254 *
255 * @param defaultValue
256 * a default value to return if this {@code BlockingObservable} emits no items
257 * @return the last item emitted by the {@code BlockingObservable}, or the default value if it emits no
258 * items
259 * @see <a href="http://reactivex.io/documentation/operators/last.html">ReactiveX documentation: Last</a>
260 */
261 public T lastOrDefault(T defaultValue) {
262 return blockForSingle(o.map(UtilityFunctions.<T>identity()).lastOrDefault(defaultValue));
263 }
264
265 /**
266 * Returns the last item emitted by this {@code BlockingObservable} that matches a predicate, or a default
267 * value if it emits no such items.
268 * <p>
269 * <img width="640" height="315" src="https://github.com/ReactiveX/RxJava/wiki/images/rx-operators/B.lastOrDefault.p.png" alt="">
270 *
271 * @param defaultValue
272 * a default value to return if this {@code BlockingObservable} emits no matching items
273 * @param predicate
274 * a predicate function to evaluate items emitted by this {@code BlockingObservable}
275 * @return the last item emitted by this {@code BlockingObservable} that matches the predicate, or the
276 * default value if it emits no matching items
277 * @see <a href="http://reactivex.io/documentation/operators/last.html">ReactiveX documentation: Last</a>
278 */
279 public T lastOrDefault(T defaultValue, Func1<? super T, Boolean> predicate) {
280 return blockForSingle(o.filter(predicate).map(UtilityFunctions.<T>identity()).lastOrDefault(defaultValue));
281 }
282
283 /**
284 * Returns an {@link Iterable} that always returns the item most recently emitted by this
285 * {@code BlockingObservable}.
286 * <p>
287 * <img width="640" height="490" src="https://github.com/ReactiveX/RxJava/wiki/images/rx-operators/B.mostRecent.png" alt="">
288 *
289 * @param initialValue
290 * the initial value that the {@link Iterable} sequence will yield if this
291 * {@code BlockingObservable} has not yet emitted an item
292 * @return an {@link Iterable} that on each iteration returns the item that this {@code BlockingObservable}
293 * has most recently emitted
294 * @see <a href="http://reactivex.io/documentation/operators/first.html">ReactiveX documentation: First</a>
295 */
296 public Iterable<T> mostRecent(T initialValue) {
297 return BlockingOperatorMostRecent.mostRecent(o, initialValue);
298 }
299
300 /**
301 * Returns an {@link Iterable} that blocks until this {@code BlockingObservable} emits another item, then
302 * returns that item.
303 * <p>
304 * <img width="640" height="490" src="https://github.com/ReactiveX/RxJava/wiki/images/rx-operators/B.next.png" alt="">
305 *
306 * @return an {@link Iterable} that blocks upon each iteration until this {@code BlockingObservable} emits
307 * a new item, whereupon the Iterable returns that item
308 * @see <a href="http://reactivex.io/documentation/operators/takelast.html">ReactiveX documentation: TakeLast</a>
309 */
310 public Iterable<T> next() {
311 return BlockingOperatorNext.next(o);
312 }
313
314 /**
315 * Returns an {@link Iterable} that returns the latest item emitted by this {@code BlockingObservable},
316 * waiting if necessary for one to become available.
317 * <p>
318 * If this {@code BlockingObservable} produces items faster than {@code Iterator.next} takes them,
319 * {@code onNext} events might be skipped, but {@code onError} or {@code onCompleted} events are not.
320 * <p>
321 * Note also that an {@code onNext} directly followed by {@code onCompleted} might hide the {@code onNext}
322 * event.
323 *
324 * @return an Iterable that always returns the latest item emitted by this {@code BlockingObservable}
325 * @see <a href="http://reactivex.io/documentation/operators/first.html">ReactiveX documentation: First</a>
326 */
327 public Iterable<T> latest() {
328 return BlockingOperatorLatest.latest(o);
329 }
330
331 /**
332 * If this {@code BlockingObservable} completes after emitting a single item, return that item, otherwise
333 * throw a {@code NoSuchElementException}.
334 * <p>
335 * <img width="640" height="315" src="https://github.com/ReactiveX/RxJava/wiki/images/rx-operators/B.single.png" alt="">
336 *
337 * @return the single item emitted by this {@code BlockingObservable}
338 * @see <a href="http://reactivex.io/documentation/operators/first.html">ReactiveX documentation: First</a>
339 */
340 public T single() {
341 return blockForSingle(o.single());
342 }
343
344 /**
345 * If this {@code BlockingObservable} completes after emitting a single item that matches a given predicate,
346 * return that item, otherwise throw a {@code NoSuchElementException}.
347 * <p>
348 * <img width="640" height="315" src="https://github.com/ReactiveX/RxJava/wiki/images/rx-operators/B.single.p.png" alt="">
349 *
350 * @param predicate
351 * a predicate function to evaluate items emitted by this {@link BlockingObservable}
352 * @return the single item emitted by this {@code BlockingObservable} that matches the predicate
353 * @see <a href="http://reactivex.io/documentation/operators/first.html">ReactiveX documentation: First</a>
354 */
355 public T single(Func1<? super T, Boolean> predicate) {
356 return blockForSingle(o.single(predicate));
357 }
358
359 /**
360 * If this {@code BlockingObservable} completes after emitting a single item, return that item; if it emits
361 * more than one item, throw an {@code IllegalArgumentException}; if it emits no items, return a default
362 * value.
363 * <p>
364 * <img width="640" height="315" src="https://github.com/ReactiveX/RxJava/wiki/images/rx-operators/B.singleOrDefault.png" alt="">
365 *
366 * @param defaultValue
367 * a default value to return if this {@code BlockingObservable} emits no items
368 * @return the single item emitted by this {@code BlockingObservable}, or the default value if it emits no
369 * items
370 * @see <a href="http://reactivex.io/documentation/operators/first.html">ReactiveX documentation: First</a>
371 */
372 public T singleOrDefault(T defaultValue) {
373 return blockForSingle(o.map(UtilityFunctions.<T>identity()).singleOrDefault(defaultValue));
374 }
375
376 /**
377 * If this {@code BlockingObservable} completes after emitting a single item that matches a predicate,
378 * return that item; if it emits more than one such item, throw an {@code IllegalArgumentException}; if it
379 * emits no items, return a default value.
380 * <p>
381 * <img width="640" height="315" src="https://github.com/ReactiveX/RxJava/wiki/images/rx-operators/B.singleOrDefault.p.png" alt="">
382 *
383 * @param defaultValue
384 * a default value to return if this {@code BlockingObservable} emits no matching items
385 * @param predicate
386 * a predicate function to evaluate items emitted by this {@code BlockingObservable}
387 * @return the single item emitted by the {@code BlockingObservable} that matches the predicate, or the
388 * default value if no such items are emitted
389 * @see <a href="http://reactivex.io/documentation/operators/first.html">ReactiveX documentation: First</a>
390 */
391 public T singleOrDefault(T defaultValue, Func1<? super T, Boolean> predicate) {
392 return blockForSingle(o.filter(predicate).map(UtilityFunctions.<T>identity()).singleOrDefault(defaultValue));
393 }
394
395 /**
396 * Returns a {@link Future} representing the single value emitted by this {@code BlockingObservable}.
397 * <p>
398 * If {@link BlockingObservable} emits more than one item, {@link java.util.concurrent.Future} will receive an
399 * {@link java.lang.IllegalArgumentException}. If {@link BlockingObservable} is empty, {@link java.util.concurrent.Future}
400 * will receive an {@link java.util.NoSuchElementException}.
401 * <p>
402 * If the {@code BlockingObservable} may emit more than one item, use {@code Observable.toList().toBlocking().toFuture()}.
403 * <p>
404 * <img width="640" height="395" src="https://github.com/ReactiveX/RxJava/wiki/images/rx-operators/B.toFuture.png" alt="">
405 *
406 * @return a {@link Future} that expects a single item to be emitted by this {@code BlockingObservable}
407 * @see <a href="http://reactivex.io/documentation/operators/to.html">ReactiveX documentation: To</a>
408 */
409 public Future<T> toFuture() {
410 return BlockingOperatorToFuture.toFuture(o);
411 }
412
413 /**
414 * Converts this {@code BlockingObservable} into an {@link Iterable}.
415 * <p>
416 * <img width="640" height="315" src="https://github.com/ReactiveX/RxJava/wiki/images/rx-operators/B.toIterable.png" alt="">
417 *
418 * @return an {@link Iterable} version of this {@code BlockingObservable}
419 * @see <a href="http://reactivex.io/documentation/operators/to.html">ReactiveX documentation: To</a>
420 */
421 public Iterable<T> toIterable() {
422 return new Iterable<T>() {
423 @Override
424 public Iterator<T> iterator() {
425 return getIterator();
426 }
427 };
428 }
429
430 /**
431 * Helper method which handles the actual blocking for a single response.
432 * <p>
433 * If the {@link Observable} errors, it will be thrown right away.
434 *
435 * @return the actual item
436 */
437 private T blockForSingle(final Observable<? extends T> observable) {
438 final AtomicReference<T> returnItem = new AtomicReference<T>();
439 final AtomicReference<Throwable> returnException = new AtomicReference<Throwable>();
440 final CountDownLatch latch = new CountDownLatch(1);
441
442 Subscription subscription = observable.subscribe(new Subscriber<T>() {
443 @Override
444 public void onCompleted() {
445 latch.countDown();
446 }
447
448 @Override
449 public void onError(final Throwable e) {
450 returnException.set(e);
451 latch.countDown();
452 }
453
454 @Override
455 public void onNext(final T item) {
456 returnItem.set(item);
457 }
458 });
459
460 try {
461 latch.await();
462 } catch (InterruptedException e) {
463 subscription.unsubscribe();
464 Thread.currentThread().interrupt();
465 throw new RuntimeException("Interrupted while waiting for subscription to complete.", e);
466 }
467
468 if (returnException.get() != null) {
469 if (returnException.get() instanceof RuntimeException) {
470 throw (RuntimeException) returnException.get();
471 } else {
472 throw new RuntimeException(returnException.get());
473 }
474 }
475
476 return returnItem.get();
477 }
478 }